package com.bieber.server.remote;

import com.bieber.common.Constants;
import com.bieber.common.utils.NetUtils;
import com.bieber.registry.Notification;
import com.bieber.registry.Registry;
import com.bieber.server.config.ServerConfig;
import com.bieber.server.executor.NamedThreadFactory;
import com.bieber.server.netty.handler.Decoder;
import com.bieber.server.netty.handler.Encoder;
import com.bieber.server.netty.handler.NettyHandler;
import com.bieber.server.notifications.SenderNotification;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang.StringUtils;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URLEncoder;
import java.util.List;

/**
 * Created by bieber on 2015/8/20.
 */
public class Server {

    private volatile boolean running=false;

    private ServerBootstrap bootstrap;

    private EventLoopGroup bossGroup;

    private EventLoopGroup workerGroup;
    
    private Channel channel;
    
    private ServerConfig config;

    private Registry registry;

    public Server(ServerConfig config,Registry registry){
        this.config = config;
        this.registry = registry;
        
        this.registry.dynamicRegister(Constants.CONFIGURATOR_PATH+"/"+this.config.getNodeName(),false);
        this.registry.staticRegister(Constants.NODES_PATH);
        this.registry.subscribe(Constants.CONFIGURATOR_PATH + "/" + this.config.getNodeName(), new Notification() {
            @Override
            public void notify(List<String> children) {

            }
        });
    }
    
    public void initialize(){
        Constants.COMMON_LOGGER.info("initialize bootstrap server.....");
        bootstrap = new ServerBootstrap();
        bossGroup = new NioEventLoopGroup(1,new NamedThreadFactory("NETTY-SERVER-BOSS-GROUP"));
        workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),new NamedThreadFactory("NETTY-SERVER-WORKER-GROUP"));
        bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                //
                .option(ChannelOption.SO_REUSEADDR, true)
                        //
                .option(ChannelOption.SO_KEEPALIVE, false)
                        //
                .childOption(ChannelOption.TCP_NODELAY, true)
                        //
                .option(ChannelOption.SO_SNDBUF, config.getServerSocketSndBufSize())
                        //
                .option(ChannelOption.SO_RCVBUF, config.getServerSocketRcvBufSize())
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast("decoder",new Decoder(config))
                                .addLast("encoder",new Encoder(config))
                                .addLast("handler",new NettyHandler());
                    }
                });
    }

    public void start(){
        if(running){
            throw new IllegalStateException("Server had started!");
        }
        SocketAddress socketAddress = new InetSocketAddress(config.getPort());
        if(!StringUtils.isEmpty(config.getBindAddress())){
            socketAddress = new InetSocketAddress(config.getBindAddress(), config.getPort());
        }
        ChannelFuture channelFuture  =  bootstrap.bind(socketAddress);
        Constants.COMMON_LOGGER.info("started bootstrap server listened on "+socketAddress);
        channel = channelFuture.channel();
        try {
            registry.dynamicRegister(Constants.NODES_PATH+"/"+ URLEncoder.encode(config.getNodeName()+"//"+(StringUtils.isEmpty(config.getBindAddress())?NetUtils.getLocalAddress().getHostAddress():config.getBindAddress())+":"+config.getPort(),config.getCharSet()));
            registry.subscribe(Constants.NODES_PATH,new SenderNotification(config));
            channelFuture.sync();
        } catch (Exception e) {
            throw new RuntimeException("Transporter server bind error",e);
        }
    }

    public void stop() {
        Constants.COMMON_LOGGER.info("stopping bootstrap server......");
        channel.close();
        Constants.COMMON_LOGGER.info("stopped bootstrap server......");
    }
}
